{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Redis Stream Examples"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## basic config"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "redis_host = \"redis\"\n",
    "stream_key = \"skey\"\n",
    "stream2_key = \"s2key\"\n",
    "group1 = \"grp1\"\n",
    "group2 = \"grp2\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## connection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import redis\n",
    "from time import time\n",
    "from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError\n",
    "\n",
    "r = redis.Redis( redis_host )\n",
    "r.ping()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## xadd and xread"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### add some data to the stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "stream length: 10\n"
     ]
    }
   ],
   "source": [
    "for i in range(0,10):\n",
    "    r.xadd( stream_key, { 'ts': time(), 'v': i } )\n",
    "print( f\"stream length: {r.xlen( stream_key )}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### read some data from the stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]\n"
     ]
    }
   ],
   "source": [
    "## read 2 entries from stream_key\n",
    "l = r.xread( count=2, streams={stream_key:0} )\n",
    "print(l)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### extract data from the returned structure"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got data from stream: b'skey'\n",
      "id: b'1657571033115-0' value: b'0'\n",
      "id: b'1657571033117-0' value: b'1'\n"
     ]
    }
   ],
   "source": [
    "first_stream = l[0]\n",
    "print( f\"got data from stream: {first_stream[0]}\")\n",
    "fs_data = first_stream[1]\n",
    "for id, value in fs_data:\n",
    "    print( f\"id: {id} value: {value[b'v']}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### read more data from the stream\n",
    "if we call the `xread` with the same arguments we will get the same data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "id: b'1657571033115-0' value: b'0'\n",
      "id: b'1657571033117-0' value: b'1'\n"
     ]
    }
   ],
   "source": [
    "l = r.xread( count=2, streams={stream_key:0} )\n",
    "for id, value in l[0][1]:\n",
    "    print( f\"id: {id} value: {value[b'v']}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "to get new data we need to change the key passed to the call"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "id: b'1657571033118-0' value: b'2'\n",
      "id: b'1657571033119-0' value: b'3'\n"
     ]
    }
   ],
   "source": [
    "last_id_returned = l[0][1][-1][0]\n",
    "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n",
    "for id, value in l[0][1]:\n",
    "    print( f\"id: {id} value: {value[b'v']}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "id: b'1657571033119-1' value: b'4'\n",
      "id: b'1657571033121-0' value: b'5'\n"
     ]
    }
   ],
   "source": [
    "last_id_returned = l[0][1][-1][0]\n",
    "l = r.xread( count=2, streams={stream_key: last_id_returned} )\n",
    "for id, value in l[0][1]:\n",
    "    print( f\"id: {id} value: {value[b'v']}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "to get only newer entries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "stream length: 10\n",
      "after 5s block, got an empty list [], no *new* messages on the stream\n",
      "stream length: 10\n"
     ]
    }
   ],
   "source": [
    "print( f\"stream length: {r.xlen( stream_key )}\")\n",
    "# wait for 5s for new messages\n",
    "l = r.xread( count=1, block=5000, streams={stream_key: '$'} )\n",
    "print( f\"after 5s block, got an empty list {l}, no *new* messages on the stream\")\n",
    "print( f\"stream length: {r.xlen( stream_key )}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2nd stream\n",
    "Add some messages to a 2nd stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "stream length: 10\n"
     ]
    }
   ],
   "source": [
    "for i in range(1000,1010):\n",
    "    r.xadd( stream2_key, { 'v': i } )\n",
    "print( f\"stream length: {r.xlen( stream2_key )}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "get messages from the 2 streams"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]\n",
      "got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]\n"
     ]
    }
   ],
   "source": [
    "l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )\n",
    "for k,d in l:\n",
    "    print(f\"got from {k} the entry {d}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# stream groups\n",
    "With the groups is possible track, for many consumers, and at the Redis side, which message have been already consumed.\n",
    "## add some data to streams\n",
    "Creating 2 streams with 10 messages each."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "stream 'skey' length: 20\n",
      "stream 's2key' length: 20\n"
     ]
    }
   ],
   "source": [
    "def add_some_data_to_stream( sname, key_range ):\n",
    "    for i in key_range:\n",
    "        r.xadd( sname, { 'ts': time(), 'v': i } )\n",
    "    print( f\"stream '{sname}' length: {r.xlen( stream_key )}\")\n",
    "\n",
    "add_some_data_to_stream( stream_key, range(0,10) )\n",
    "add_some_data_to_stream( stream2_key, range(1000,1010) )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## use a group to read from the stream\n",
    "* create a group `grp1` with the stream `skey`, and\n",
    "* create a group `grp2` with the streams `skey` and `s2key`\n",
    "\n",
    "Use the `xinfo_group` to verify the result of the group creation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id\n",
      "skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n",
      "s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id\n"
     ]
    }
   ],
   "source": [
    "## create the group\n",
    "def create_group( skey, gname ):\n",
    "    try:\n",
    "        r.xgroup_create( name=skey, groupname=gname, id=0 )\n",
    "    except ResponseError as e:\n",
    "        print(f\"raised: {e}\")\n",
    "\n",
    "# group1 read the stream 'skey'\n",
    "create_group( stream_key, group1 )\n",
    "# group2 read the streams 'skey' and 's2key'\n",
    "create_group( stream_key, group2 )\n",
    "create_group( stream2_key, group2 )\n",
    "\n",
    "def group_info( skey ):\n",
    "    res = r.xinfo_groups( name=skey )\n",
    "    for i in res:\n",
    "        print( f\"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}\"\n",
    "              + f\" as last read id\")\n",
    "    \n",
    "group_info( stream_key )\n",
    "group_info( stream2_key )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## group read\n",
    "The `xreadgroup` method permit to read from a stream group."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "def print_xreadgroup_reply( reply, group = None, run = None):\n",
    "    for d_stream in reply:\n",
    "        for element in d_stream[1]:\n",
    "            print(  f\"got element {element[0]}\"\n",
    "                  + f\"from stream {d_stream[0]}\" )\n",
    "            if run is not None:\n",
    "                run( d_stream[0], group, element[0] )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got element b'1657571033115-0'from stream b'skey'\n",
      "got element b'1657571033117-0'from stream b'skey'\n"
     ]
    }
   ],
   "source": [
    "# read some messages on group1 with consumer 'c' \n",
    "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n",
    "                  count=2, streams={stream_key:'>'})\n",
    "print_xreadgroup_reply( d )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A **2nd consumer** for the same stream group will get not delivered messages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got element b'1657571033118-0'from stream b'skey'\n",
      "got element b'1657571033119-0'from stream b'skey'\n"
     ]
    }
   ],
   "source": [
    "# read some messages on group1 with consumer 'c' \n",
    "d = r.xreadgroup( groupname=group1, consumername='c2', block=10,\n",
    "                  count=2, streams={stream_key:'>'})\n",
    "print_xreadgroup_reply( d )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But a **2nd stream group** can read the already delivered messages again.\n",
    "\n",
    "Note that the 2nd stream group include also the 2nd stream.\n",
    "That can be identified in the reply (1st element of the reply list)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got element b'1657571033115-0'from stream b'skey'\n",
      "got element b'1657571033117-0'from stream b'skey'\n",
      "got element b'1657571042111-0'from stream b's2key'\n",
      "got element b'1657571042113-0'from stream b's2key'\n"
     ]
    }
   ],
   "source": [
    "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n",
    "                   count=2, streams={stream_key:'>',stream2_key:'>'})\n",
    "print_xreadgroup_reply( d2 )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To check for pending messages (delivered messages without acknowledgment) we can use the `xpending`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4 pending messages on 'skey' for group 'grp1'\n",
      "2 pending messages on 'skey' for group 'grp2'\n",
      "2 pending messages on 's2key' for group 'grp2'\n"
     ]
    }
   ],
   "source": [
    "# check pending status (read messages without a ack)\n",
    "def print_pending_info( key_group ):\n",
    "    for s,k in key_group:\n",
    "        pr = r.xpending( name=s, groupname=k )\n",
    "        print( f\"{pr.get('pending')} pending messages on '{s}' for group '{k}'\" )\n",
    "    \n",
    "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ack\n",
    "Acknowledge some messages with `xack`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got element b'1657571033118-0'from stream b'skey'\n",
      "got element b'1657571033119-0'from stream b'skey'\n"
     ]
    }
   ],
   "source": [
    "# do acknowledges for group1\n",
    "toack = lambda k,g,e: r.xack( k,g, e )\n",
    "print_xreadgroup_reply( d, group=group1, run=toack )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2 pending messages on 'skey' for group 'grp1'\n",
      "2 pending messages on 'skey' for group 'grp2'\n",
      "2 pending messages on 's2key' for group 'grp2'\n"
     ]
    }
   ],
   "source": [
    "# check pending again\n",
    "print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "ack all messages on the `group1`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got element b'1657571033119-1'from stream b'skey'\n",
      "got element b'1657571033121-0'from stream b'skey'\n",
      "got element b'1657571033121-1'from stream b'skey'\n",
      "got element b'1657571033121-2'from stream b'skey'\n",
      "got element b'1657571033122-0'from stream b'skey'\n",
      "got element b'1657571033122-1'from stream b'skey'\n",
      "got element b'1657571049557-0'from stream b'skey'\n",
      "got element b'1657571049557-1'from stream b'skey'\n",
      "got element b'1657571049558-0'from stream b'skey'\n",
      "got element b'1657571049559-0'from stream b'skey'\n",
      "got element b'1657571049559-1'from stream b'skey'\n",
      "got element b'1657571049559-2'from stream b'skey'\n",
      "got element b'1657571049560-0'from stream b'skey'\n",
      "got element b'1657571049562-0'from stream b'skey'\n",
      "got element b'1657571049563-0'from stream b'skey'\n",
      "got element b'1657571049563-1'from stream b'skey'\n",
      "2 pending messages on 'skey' for group 'grp1'\n"
     ]
    }
   ],
   "source": [
    "d = r.xreadgroup( groupname=group1, consumername='c', block=10,\n",
    "                      count=100, streams={stream_key:'>'})\n",
    "print_xreadgroup_reply( d, group=group1, run=toack)\n",
    "print_pending_info( ((stream_key,group1),) )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But stream length will be the same after the `xack` of all messages on the `group1`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "20"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "r.xlen(stream_key)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## delete all\n",
    "To remove the messages with need to remote them explicitly with `xdel`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "s1 = r.xread( streams={stream_key:0} )\n",
    "for streams in s1:\n",
    "    stream_name, messages = streams\n",
    "    # del all ids from the message list\n",
    "    [ r.xdel( stream_name, i[0] ) for i in messages ]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "stream length"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "r.xlen(stream_key)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But with the `xdel` the 2nd group can read any not processed message from the `skey`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "got element b'1657571042113-1'from stream b's2key'\n",
      "got element b'1657571042114-0'from stream b's2key'\n"
     ]
    }
   ],
   "source": [
    "d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,\n",
    "                   count=2, streams={stream_key:'>',stream2_key:'>'})\n",
    "print_xreadgroup_reply( d2 )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
